1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.*;
19 import static org.mockito.Matchers.any;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.Mock;
27 import org.mockito.MockitoAnnotations;
28
29 import rx.Observable;
30 import rx.Observer;
31 import rx.Subscriber;
32 import rx.exceptions.OnErrorNotImplementedException;
33 import rx.functions.Action1;
34 import rx.functions.Func1;
35
36 import java.util.List;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 public class OperatorDoOnEachTest {
40
41 @Mock
42 Observer<String> subscribedObserver;
43 @Mock
44 Observer<String> sideEffectObserver;
45
46 @Before
47 public void before() {
48 MockitoAnnotations.initMocks(this);
49 }
50
51 @Test
52 public void testDoOnEach() {
53 Observable<String> base = Observable.just("a", "b", "c");
54 Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
55
56 doOnEach.subscribe(subscribedObserver);
57
58
59 verify(subscribedObserver, never()).onError(any(Throwable.class));
60 verify(subscribedObserver, times(1)).onNext("a");
61 verify(subscribedObserver, times(1)).onNext("b");
62 verify(subscribedObserver, times(1)).onNext("c");
63 verify(subscribedObserver, times(1)).onCompleted();
64
65
66 verify(sideEffectObserver, never()).onError(any(Throwable.class));
67 verify(sideEffectObserver, times(1)).onNext("a");
68 verify(sideEffectObserver, times(1)).onNext("b");
69 verify(sideEffectObserver, times(1)).onNext("c");
70 verify(sideEffectObserver, times(1)).onCompleted();
71 }
72
73 @Test
74 public void testDoOnEachWithError() {
75 Observable<String> base = Observable.just("one", "fail", "two", "three", "fail");
76 Observable<String> errs = base.map(new Func1<String, String>() {
77 @Override
78 public String call(String s) {
79 if ("fail".equals(s)) {
80 throw new RuntimeException("Forced Failure");
81 }
82 return s;
83 }
84 });
85
86 Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
87
88 doOnEach.subscribe(subscribedObserver);
89 verify(subscribedObserver, times(1)).onNext("one");
90 verify(subscribedObserver, never()).onNext("two");
91 verify(subscribedObserver, never()).onNext("three");
92 verify(subscribedObserver, never()).onCompleted();
93 verify(subscribedObserver, times(1)).onError(any(Throwable.class));
94
95 verify(sideEffectObserver, times(1)).onNext("one");
96 verify(sideEffectObserver, never()).onNext("two");
97 verify(sideEffectObserver, never()).onNext("three");
98 verify(sideEffectObserver, never()).onCompleted();
99 verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
100 }
101
102 @Test
103 public void testDoOnEachWithErrorInCallback() {
104 Observable<String> base = Observable.just("one", "two", "fail", "three");
105 Observable<String> doOnEach = base.doOnNext(new Action1<String>() {
106 @Override
107 public void call(String s) {
108 if ("fail".equals(s)) {
109 throw new RuntimeException("Forced Failure");
110 }
111 }
112 });
113
114 doOnEach.subscribe(subscribedObserver);
115 verify(subscribedObserver, times(1)).onNext("one");
116 verify(subscribedObserver, times(1)).onNext("two");
117 verify(subscribedObserver, never()).onNext("three");
118 verify(subscribedObserver, never()).onCompleted();
119 verify(subscribedObserver, times(1)).onError(any(Throwable.class));
120
121 }
122
123 @Test
124 public void testIssue1451Case1() {
125
126 final int expectedCount = 3;
127 final AtomicInteger count = new AtomicInteger();
128 for (int i=0; i < expectedCount; i++) {
129 Observable
130 .just(Boolean.TRUE, Boolean.FALSE)
131 .takeWhile(new Func1<Boolean, Boolean>() {
132 @Override
133 public Boolean call(Boolean value) {
134 return value;
135 }
136 })
137 .toList()
138 .doOnNext(new Action1<List<Boolean>>() {
139 @Override
140 public void call(List<Boolean> booleans) {
141 count.incrementAndGet();
142 }
143 })
144 .subscribe();
145 }
146 assertEquals(expectedCount, count.get());
147 }
148
149 @Test
150 public void testIssue1451Case2() {
151
152 final int expectedCount = 3;
153 final AtomicInteger count = new AtomicInteger();
154 for (int i=0; i < expectedCount; i++) {
155 Observable
156 .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
157 .takeWhile(new Func1<Boolean, Boolean>() {
158 @Override
159 public Boolean call(Boolean value) {
160 return value;
161 }
162 })
163 .toList()
164 .doOnNext(new Action1<List<Boolean>>() {
165 @Override
166 public void call(List<Boolean> booleans) {
167 count.incrementAndGet();
168 }
169 })
170 .subscribe();
171 }
172 assertEquals(expectedCount, count.get());
173 }
174
175 @Test
176 public void testFatalError() {
177 try {
178 Observable.just(1, 2, 3)
179 .flatMap(new Func1<Integer, Observable<?>>() {
180 @Override
181 public Observable<?> call(Integer integer) {
182 return Observable.create(new Observable.OnSubscribe<Object>() {
183 @Override
184 public void call(Subscriber<Object> o) {
185 throw new NullPointerException("Test NPE");
186 }
187 });
188 }
189 })
190 .doOnNext(new Action1<Object>() {
191 @Override
192 public void call(Object o) {
193 System.out.println("Won't come here");
194 }
195 })
196 .subscribe();
197 fail("should have thrown an exception");
198 } catch (OnErrorNotImplementedException e) {
199 assertTrue(e.getCause() instanceof NullPointerException);
200 assertEquals(e.getCause().getMessage(), "Test NPE");
201 System.out.println("Received exception: " + e);
202 }
203 }
204 }